From: Jeroen van der Heijden Date: Tue, 20 Mar 2018 13:27:43 +0000 (+0100) Subject: Work on less io X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~9^2~50 X-Git-Url: https://dgit.raspbian.org/%22http://www.example.com/cgi/success//%22http:/www.example.com/cgi/success/?a=commitdiff_plain;h=561b130c9fbf716558b64f0b9c627b145d12a412;p=siridb-server.git Work on less io --- diff --git a/Debug/src/siri/db/subdir.mk b/Debug/src/siri/db/subdir.mk index 395356fd..d2ee8e0e 100644 --- a/Debug/src/siri/db/subdir.mk +++ b/Debug/src/siri/db/subdir.mk @@ -8,6 +8,7 @@ C_SRCS += \ ../src/siri/db/aggregate.c \ ../src/siri/db/auth.c \ ../src/siri/db/buffer.c \ +../src/siri/db/chunk.c \ ../src/siri/db/db.c \ ../src/siri/db/ffile.c \ ../src/siri/db/fifo.c \ diff --git a/Release/src/siri/db/subdir.mk b/Release/src/siri/db/subdir.mk index 493c442e..dfc8f016 100644 --- a/Release/src/siri/db/subdir.mk +++ b/Release/src/siri/db/subdir.mk @@ -8,6 +8,7 @@ C_SRCS += \ ../src/siri/db/aggregate.c \ ../src/siri/db/auth.c \ ../src/siri/db/buffer.c \ +../src/siri/db/chunk.c \ ../src/siri/db/db.c \ ../src/siri/db/ffile.c \ ../src/siri/db/fifo.c \ diff --git a/include/siri/db/chunk.h b/include/siri/db/chunk.h new file mode 100644 index 00000000..e69de29b diff --git a/include/siri/db/insert.h b/include/siri/db/insert.h index 6e289a14..92a081c7 100644 --- a/include/siri/db/insert.h +++ b/include/siri/db/insert.h @@ -24,7 +24,7 @@ typedef enum { - ERR_EXPECTING_ARRAY=-9, + ERR_EXPECTING_ARRAY=-10, ERR_EXPECTING_SERIES_NAME, ERR_EXPECTING_MAP_OR_ARRAY, ERR_EXPECTING_INTEGER_TS, diff --git a/include/siri/db/shard.h b/include/siri/db/shard.h index 8093dcb6..db9194ac 100644 --- a/include/siri/db/shard.h +++ b/include/siri/db/shard.h @@ -88,7 +88,7 @@ int siridb_shard_status(char * str, siridb_shard_t * shard); int siridb_shard_load(siridb_t * siridb, uint64_t id); void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb); -long int siridb_shard_write_points( +size_t siridb_shard_write_points( siridb_t * siridb, siridb_series_t * series, siridb_shard_t * shard, diff --git a/src/siri/db/buffer.c b/src/siri/db/buffer.c index 5e7efa63..dce6b5bb 100644 --- a/src/siri/db/buffer.c +++ b/src/siri/db/buffer.c @@ -60,6 +60,12 @@ int siridb_buffer_write_point( uint64_t * ts, qp_via_t * val) { + const size_t sz = sizeof(uint64_t) + sizeof(qp_via_t); + char buf[sz]; + + memcpy(buf, ts, sizeof(uint64_t)); + memcpy(buf + sizeof(uint64_t), val, sizeof(qp_via_t)); + return ( siridb_buffer_write_len(siridb, series) || @@ -68,11 +74,8 @@ int siridb_buffer_write_point( 16 * (series->buffer->len - 1), SEEK_CUR) || - /* write time-stamp */ - fwrite(ts, sizeof(uint64_t), 1, siridb->buffer_fp) != 1 || - - /* write value */ - fwrite(val, sizeof(qp_via_t), 1, siridb->buffer_fp) != 1) ? EOF : 0; + /* write time-stamp and value */ + fwrite(buf, sz, 1, siridb->buffer_fp) != 1) ? EOF : 0; } /* @@ -242,6 +245,9 @@ int siridb_buffer_load(siridb_t * siridb) */ static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series) { + const size_t sz = sizeof(uint32_t) + sizeof(size_t); + char buf[sz]; + series->bf_offset = (long int) slist_pop(siridb->empty_buffers); /* jump to the correct buffer position */ @@ -251,19 +257,11 @@ static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series) return -1; } - /* write series ID to buffer */ - if (fwrite(&series->id, sizeof(uint32_t), 1, siridb->buffer_fp) != 1) - { - ERR_FILE - return -1; - } + memcpy(buf, &series->id, sizeof(uint32_t)); + memcpy(buf + sizeof(uint32_t), &series->buffer->len, sizeof(size_t)); - /* write 0 length */ - if (fwrite( - &series->buffer->len, - sizeof(size_t), - 1, - siridb->buffer_fp) != 1) + /* write series ID and 0 length to buffer */ + if (fwrite(buf, sz, 1, siridb->buffer_fp) != 1) { ERR_FILE return -1; diff --git a/src/siri/db/chunk.c b/src/siri/db/chunk.c new file mode 100644 index 00000000..e69de29b diff --git a/src/siri/db/insert.c b/src/siri/db/insert.c index 11153308..d6d18710 100644 --- a/src/siri/db/insert.c +++ b/src/siri/db/insert.c @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -194,6 +195,7 @@ ssize_t siridb_insert_assign_pools( { rc = ERR_EXPECTING_MAP_OR_ARRAY; } + return (siri_err) ? ERR_MEM_ALLOC : rc; } diff --git a/src/siri/db/series.c b/src/siri/db/series.c index 5ae9d0c3..8aca61cd 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -872,7 +872,7 @@ int siridb_series_optimize_shard( end += new_idx; - long int pos; + size_t pos; uint16_t chunk_sz; uint_fast32_t num_chunks, pstart, pend, diff; siridb_shard_get_points_cb get_points_cb = \ @@ -919,7 +919,7 @@ int siridb_series_optimize_shard( pstart, pend, siri.optimize->idx_fp, - &cinfo)) == EOF) + &cinfo)) == 0) { log_critical( "Cannot write points to shard id '%" PRIu64 "'", diff --git a/src/siri/db/shard.c b/src/siri/db/shard.c index b5826ad0..cd76ffd0 100644 --- a/src/siri/db/shard.c +++ b/src/siri/db/shard.c @@ -129,7 +129,7 @@ static int SHARD_load_idx( int is_ts64); static inline int SHARD_init_fn(siridb_t * siridb, siridb_shard_t * shard); static int SHARD_grow(siridb_shard_t * shard); -static int SHARD_write_header( +static size_t SHARD_write_header( siridb_t * siridb, siridb_series_t * series, siridb_points_t * points, @@ -457,9 +457,9 @@ int siridb_shard_status(char * str, siridb_shard_t * shard) * Writes an index and points to a shard. The return value is the position * where the points start in the shard file. * - * If an error has occurred, EOF will be returned and a SIGNAL will be raised. + * If an error has occurred, 0 will be returned and a SIGNAL will be raised. */ -long int siridb_shard_write_points( +size_t siridb_shard_write_points( siridb_t * siridb, siridb_series_t * series, siridb_shard_t * shard, @@ -475,8 +475,7 @@ long int siridb_shard_write_points( unsigned char * cdata = NULL; uint_fast32_t i; - long int pos = EOF; - int header_sz; + size_t pos, header_sz; if (shard->fp->fp == NULL) { @@ -484,7 +483,7 @@ long int siridb_shard_write_points( { ERR_FILE log_critical("Cannot open file '%s'", shard->fn); - return EOF; + return 0; } } fp = shard->fp->fp; @@ -494,8 +493,9 @@ long int siridb_shard_write_points( cdata = siridb_points_zip(points, start, end, cinfo, &dsize); if (cdata == NULL) { + ERR_ALLOC log_critical("Memory allocation error while compressing points"); - return -1; + return 0; } } else if (series->tp == TP_STRING) @@ -504,8 +504,9 @@ long int siridb_shard_write_points( cdata = siridb_points_raw_string(points, start, end, cinfo, &dsize); if (cdata == NULL) { + ERR_ALLOC log_critical("Memory allocation error while compressing points"); - return -1; + return 0; } } else @@ -523,7 +524,7 @@ long int siridb_shard_write_points( if (fseeko(fp, shard->len, SEEK_SET)) { log_critical("Seek error in: '%s'", shard->fn); - return -1; + return 0; } if (idx_fp == NULL || (shard->flags & SIRIDB_SHARD_HAS_NEW_VALUES)) @@ -552,50 +553,47 @@ long int siridb_shard_write_points( pos = shard->len; } - if (header_sz < 0) + if (!header_sz) { ERR_FILE log_critical( "Cannot write index header for shard id %" PRIu64, shard->id); free(cdata); - return EOF; + return 0; } - if (cdata != NULL) + if (cdata == NULL) { - long int rc = fwrite(cdata, dsize, 1, fp); - free(cdata); - if (rc != 1) + size_t p = 0; + size_t ts_sz = siridb->time->ts_sz; + cdata = (unsigned char *) malloc(dsize); + if (cdata == NULL) { - ERR_FILE - log_critical("Cannot write points to file '%s'", shard->fn); - return EOF; + ERR_ALLOC + log_critical("Memory allocation error while compressing points"); + return 0; } - } - else - { + for (i = start; i < end; i++) { - if (fwrite(&points->data[i].ts, siridb->time->ts_sz, 1, fp) != 1 || - fwrite(&points->data[i].val, 8, 1, fp) != 1) - { - ERR_FILE - log_critical("Cannot write points to file '%s'", shard->fn); - return EOF; - } + memcpy(cdata + p, &points->data[i].ts, ts_sz); + p += ts_sz; + memcpy(cdata + p, &points->data[i].val, 8); + p += 8; } } - if (fflush(fp)) + long int rc = fwrite(cdata, dsize, 1, fp); + free(cdata); + if (rc != 1) { ERR_FILE - log_critical("Cannot write flush file '%s'", shard->fn); - return EOF; + log_critical("Cannot write points to file '%s'", shard->fn); + return 0; } shard->len = pos + dsize; - return pos; } @@ -1847,10 +1845,10 @@ static inline int SHARD_init_fn(siridb_t * siridb, siridb_shard_t * shard) * Write a header for a chunk of points. The header can be written to argument * fp which should be a pointer to the index, or the shard file. * - * In case of an error the function return EOF, otherwise the size which is + * In case of an error the function returns 0, otherwise the size which is * written. */ -static int SHARD_write_header( +static size_t SHARD_write_header( siridb_t * siridb, siridb_series_t * series, siridb_points_t * points, @@ -1860,12 +1858,9 @@ static int SHARD_write_header( FILE * fp) { uint16_t len = end - start; - int size = EOF; - - if (fwrite(&series->id, sizeof(uint32_t), 1, fp) != 1) - { - return EOF; - } + size_t size = sizeof(uint32_t); + char buf[24]; + memcpy(buf, &series->id, sizeof(uint32_t)); switch (siridb->time->ts_sz) { @@ -1873,22 +1868,18 @@ static int SHARD_write_header( { uint32_t start_ts = (uint32_t) points->data[start].ts; uint32_t end_ts = (uint32_t) points->data[end - 1].ts; - if (fwrite(&start_ts, sizeof(uint32_t), 1, fp) != 1 || - fwrite(&end_ts, sizeof(uint32_t), 1, fp) != 1) - { - return EOF; - } + memcpy(buf + size, &start_ts, sizeof(uint32_t)); + size += sizeof(uint32_t); + memcpy(buf + size, &end_ts, sizeof(uint32_t)); + size += sizeof(uint32_t); } - size = IDX32_SZ; break; case sizeof(uint64_t): - if (fwrite(&points->data[start].ts, sizeof(uint64_t), 1, fp) != 1 || - fwrite(&points->data[end - 1].ts, sizeof(uint64_t), 1, fp) != 1) - { - return EOF; - } - size = IDX64_SZ; + memcpy(buf + size, &points->data[start].ts, sizeof(uint64_t)); + size += sizeof(uint64_t); + memcpy(buf + size, &points->data[end - 1].ts, sizeof(uint64_t)); + size += sizeof(uint64_t); break; default: @@ -1896,19 +1887,18 @@ static int SHARD_write_header( break; } + memcpy(buf + size, &len, sizeof(uint16_t)); + size += sizeof(uint16_t); - if (fwrite(&len, sizeof(uint16_t), 1, fp) != 1) + if (cinfo != NULL) { - return EOF; + memcpy(buf + size, cinfo, sizeof(uint16_t)); + size += sizeof(uint16_t); } - if (cinfo != NULL) + if (fwrite(buf, size, 1, fp) != 1) { - size += sizeof(uint16_t); - if (fwrite(cinfo, sizeof(uint16_t), 1, fp) != 1) - { - return EOF; - } + return 0; } return size; diff --git a/src/siri/db/shards.c b/src/siri/db/shards.c index bbff50cf..a52480cb 100644 --- a/src/siri/db/shards.c +++ b/src/siri/db/shards.c @@ -137,8 +137,7 @@ int siridb_shards_add_points( uint_fast32_t start, end, num_chunks, pstart, pend; uint16_t chunk_sz; uint16_t cinfo = 0; - size_t size; - long int pos; + size_t size, pos; for (end = 0; end < points->len;) { @@ -188,7 +187,7 @@ int siridb_shards_add_points( pstart, pend, NULL, - &cinfo)) < 0) + &cinfo)) == 0) { log_critical( "Could not write points to shard id %" PRIu64, diff --git a/test/test_insert.py b/test/test_insert.py index 928f8269..c4eedeba 100644 --- a/test/test_insert.py +++ b/test/test_insert.py @@ -50,7 +50,7 @@ class TestInsert(TestBase): await client.insert_some_series(series, timeout=timeout, points=self.GEN_POINTS) await asyncio.sleep(1.0) - @default_test_setup(2, time_precision=TIME_PRECISION, compression=True) + @default_test_setup(2, time_precision=TIME_PRECISION, compression=False) async def run(self): await self.client0.connect()